package com.bw;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.base.BaseApp;
import com.bw.gmall.realtime.common.bean.CartAddUuBean;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.funciton.DorisMapFunction;
import com.bw.gmall.realtime.common.util.DateFormatUtil;
import com.bw.gmall.realtime.common.util.FlinkSinkUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class DwsTradeCartAddUuWindow extends BaseApp {
    public static void main(String[] args) {
        new DwsTradeCartAddUuWindow().start(
                Constant.TOPIC_DWD_TRADE_CART_ADD,
                Constant.DWS_TRADE_CART_ADD_UU_WINDOW,
                1,
                10026
        );
    }

    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> kafkaSource) {
        // 1.读取数据源
        // 2.数据清洗
        SingleOutputStreamOperator<JSONObject> etlStream = etl(kafkaSource);
        //3.添加水位线
        SingleOutputStreamOperator<JSONObject> watermarksStream = getWatermarksStream(etlStream);
        //4.keyby
        SingleOutputStreamOperator<CartAddUuBean> KeyByStream = getKeyByStream(watermarksStream);
        // 5、开窗聚合
        SingleOutputStreamOperator<CartAddUuBean> reduceStream = getReduceStream(KeyByStream);
        // 6.写入到doris
//        reduceStream.print();
        reduceStream.map(new DorisMapFunction<>()).sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_TRADE_CART_ADD_UU_WINDOW));


    }

    private SingleOutputStreamOperator<CartAddUuBean> getReduceStream(SingleOutputStreamOperator<CartAddUuBean> KeyByStream) {
        return KeyByStream.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<CartAddUuBean>() {
                    @Override
                    public CartAddUuBean reduce(CartAddUuBean value1, CartAddUuBean value2) throws Exception {
                        value1.setCartAddUuCt(value1.getCartAddUuCt() + value2.getCartAddUuCt());
                        return value1;
                    }
                }, new ProcessAllWindowFunction<CartAddUuBean, CartAddUuBean, TimeWindow>() {
                    @Override
                    public void process(ProcessAllWindowFunction<CartAddUuBean, CartAddUuBean, TimeWindow>.Context context, Iterable<CartAddUuBean> iterable, Collector<CartAddUuBean> collector) throws Exception {
                        TimeWindow window = context.window();
                        long start = window.getStart();
                        long end = window.getEnd();
                        for (CartAddUuBean cartAddUuBean : iterable) {
                            cartAddUuBean.setStt(DateFormatUtil.tsToDateTime(start));
                            cartAddUuBean.setEdt(DateFormatUtil.tsToDateTime(end));
                            cartAddUuBean.setCurDate(DateFormatUtil.tsToDate(System.currentTimeMillis()));
                            collector.collect(cartAddUuBean);
                        }
                    }
                });
    }

    private SingleOutputStreamOperator<CartAddUuBean> getKeyByStream(SingleOutputStreamOperator<JSONObject> watermarksStream) {
        SingleOutputStreamOperator<CartAddUuBean> KeyByStream = watermarksStream.keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject jsonObject) throws Exception {
                return jsonObject.getString("user_id");
            }
        }).process(new KeyedProcessFunction<String, JSONObject, CartAddUuBean>() {
            private ValueState<String> userAddState;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("user_add", String.class);
                valueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1L))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
                userAddState = getRuntimeContext().getState(valueStateDescriptor);
            }

            @Override
            public void processElement(JSONObject jsonObject, KeyedProcessFunction<String, JSONObject, CartAddUuBean>.Context context, Collector<CartAddUuBean> collector) throws Exception {
                Long ts = jsonObject.getLong("ts");
                String curDt = DateFormatUtil.tsToDate(ts);
                String value = userAddState.value();
                long cartAddUuCt = 0L;
                if (!curDt.equals(value)) {
                    cartAddUuCt = 1L;
                    userAddState.update(curDt);
                }
                if (cartAddUuCt != 0) {
                    collector.collect(CartAddUuBean
                            .builder()
                            .cartAddUuCt(cartAddUuCt)
                            .build());
                }
            }
        });
        return KeyByStream;
    }

    private SingleOutputStreamOperator<JSONObject> getWatermarksStream(SingleOutputStreamOperator<JSONObject> etlStream) {
        return etlStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
            @Override
            public long extractTimestamp(JSONObject jsonObject, long l) {
                return jsonObject.getLong("ts");
            }
        }));
    }

    private SingleOutputStreamOperator<JSONObject> etl(DataStreamSource<String> kafkaSource) {
        return kafkaSource.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                Long ts = jsonObject.getLong("ts");
                String userId = jsonObject.getString("user_id");
                if (ts != null && userId != null) {
                    // 处理时间戳为毫秒值
                    jsonObject.put("ts", ts * 1000);
                    collector.collect(jsonObject);
                }
            }
        });
    }
}
